package com.jingge.autojob.skeleton.framework.network.client;

import com.jingge.autojob.skeleton.cluster.model.ClusterNode;
import com.jingge.autojob.skeleton.framework.network.handler.client.RPCClientProxy;
import com.jingge.autojob.skeleton.lang.AutoJobException;
import com.jingge.autojob.skeleton.model.register.IAutoJobRegister;
import com.jingge.autojob.util.cache.LocalCacheManager;
import com.jingge.autojob.util.convert.StringUtils;
import net.jodah.expiringmap.ExpirationPolicy;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 支持缓存的RPC客户端实现
 *
 * @author JingGe(* ^ ▽ ^ *)
 * @date 2023-09-22 10:26
 * @email 1158055613@qq.com
 */
@SuppressWarnings("unchecked")
public class CachedProxyClient<T> implements Closeable {
    protected LocalCacheManager<String, RPCClientProxy<T>> cacheManager;
    protected Class<T> type;

    public CachedProxyClient(int maximumAliveSize, Class<T> proxyInterfaceType) {
        cacheManager = LocalCacheManager
                .builder()
                .setMaxLength(maximumAliveSize)
                .setPolicy(ExpirationPolicy.ACCESSED)
                .addAsyncExpirationListener((key, proxy) -> {
                    ((RPCClientProxy<T>) proxy).destroyProxy();
                })
                .setExpiringTime(Long.MAX_VALUE, TimeUnit.SECONDS)
                .build();
        type = proxyInterfaceType;
    }


    public T getInstance(ClusterNode clusterNode) {
        if (clusterNode == null || StringUtils.isEmpty(clusterNode.getHost()) || clusterNode.getPort() == null) {
            throw new IllegalArgumentException("错误的节点");
        }
        String key = clusterNode.toString();
        if (cacheManager.exist(clusterNode.toString())) {
            return cacheManager
                    .get(clusterNode.toString())
                    .clientProxy();
        }
        RPCClientProxy<T> proxy = new RPCClientProxy<>(clusterNode.getHost(), clusterNode.getPort(), type);
        cacheManager.set(key, proxy);
        return proxy.clientProxy();
    }

    public T getInstance(ClusterNode clusterNode, long connectTimeout, long readTimeout, TimeUnit unit) {
        if (clusterNode == null || StringUtils.isEmpty(clusterNode.getHost()) || clusterNode.getPort() == null) {
            throw new IllegalArgumentException("错误的节点");
        }
        String key = clusterNode.toString();
        if (cacheManager.exist(clusterNode.toString())) {
            return cacheManager
                    .get(clusterNode.toString())
                    .clientProxy();
        }
        RPCClientProxy<T> proxy = new RPCClientProxy<>(clusterNode.getHost(), clusterNode.getPort(), type);
        cacheManager.set(key, proxy);
        return proxy.clientProxy(connectTimeout, readTimeout, unit);
    }

    public T getInstance(String host, int port, long connectTimeout, long readTimeout, TimeUnit unit) {
        return getInstance(new ClusterNode(host, port), connectTimeout, readTimeout, unit);
    }

    public T getInstance(String host, int port) {
        return getInstance(new ClusterNode(host, port));
    }

    @Override
    public void close() throws IOException {
        for (Map.Entry<String, RPCClientProxy<T>> entry : cacheManager.entrySet()) {
            if (entry.getValue() != null) {
                entry
                        .getValue()
                        .destroyProxy();
            }
        }
    }
}
